package com.stay4it.rxjava;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import com.stay4it.rxjava.bean.Student;

import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.schedulers.TimeInterval;
import rx.schedulers.Timestamped;

/**
 * RxJava辅助操作符 http://blog.csdn.net/jdsjlzx/article/details/54768360
 * 
 */
public class RxJava12 {

	private static List<Student> studentList = new ArrayList<Student>() {
		{
			add(new Student("Stay", 28));
			add(new Student("谷歌小弟", 23));
			add(new Student("Star", 25));
		}
	};
	
	/**
	 * Delay 
	 * 
	 * delay延迟发射
	 * @throws InterruptedException 
	 */
	private static void test1() throws InterruptedException {
		
		Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(5);
		observable
				.delay(3, TimeUnit.SECONDS)
		        .subscribe(new Subscriber<Long>() {
		            @Override
		            public void onCompleted() {
		                System.out.println("onCompleted");
		            }
		            @Override
		            public void onError(Throwable e) {
		            	System.out.println("onError");
		            }
		            @Override
		            public void onNext(Long value) {
		            	System.out.println("delay onNext value :"+ value);
		            }
		        });
		
		Thread.sleep(10000); 
		
	}
	
	/**
	 * Delay 
	 * 
	 * delaySubscription延迟订阅
	 * @throws InterruptedException 
	 */
	private static void test2() throws InterruptedException {
		
		Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(5);
		
		observable
				.delaySubscription(3, TimeUnit.SECONDS)
		        .subscribe(new Subscriber<Long>() {
		            @Override
		            public void onCompleted() {
		                System.out.println("onCompleted");
		            }
		            @Override
		            public void onError(Throwable e) {
		            	System.out.println("onError");
		            }
		            @Override
		            public void onNext(Long value) {
		            	System.out.println("delaySubscription onNext value :"+ value);
		            }
		        });
		
		Thread.sleep(10000); 
	}
	
	/**
	 * Do 
	 * 解释：用于给Observable的生命周期的各个阶段加上回调监听，Rxjava实现了很多的doxxx操作符。
	 * 
	 * doOnEach: Observable每发射一个数据的时候都会触发的回调，包括onError和onCompleted。
	 * doOnNext: 只有onNext的时候才会被触发.
	 * doOnSubscribe: Subscriber进行订阅的时候触发,
	 * doOnUnSubscribe: Subscriber进行反订阅的时候触发,通过OnError或者OnCompleted结束的时候，会反订阅所有的Subscriber。
	 * doOnError: OnError发生的时候触发回调
	 * doOnComplete: OnCompleted发生的时候触发回调
	 * doOnTerminate: 在Observable结束前触发回调，无论是正常还是异
	 * finallyDo: 在Observable结束后触发回调，无论是正常还是异常终止。   注意：finallyDo已经过时，最新的实现是doAfterTerminate
	 * 
	 */
	private static void test3() {
		
		Observable.just(1,2,3,4,5,6)
		.doOnEach(new Action1<Notification<? super Integer>>() {

			@Override
			public void call(Notification<? super Integer> t) {
				System.out.println("doOnEach value = " + t.getValue());
			}
		})
		.doOnSubscribe(new Action0() {//被订阅时回调
			
			@Override
			public void call() {
				System.out.println("doOnSubscribe");		
			}
		})
		.doOnCompleted(new Action0() {//数据序列发送完毕回调
			
			@Override
			public void call() {
				System.out.println("doOnCompleted");				
			}
		})
		.doAfterTerminate(new Action0() { //Observable终止之后会被调用，无论是正常还是异常终止
			
			@Override
			public void call() {
				System.out.println("doAfterTerminate");		
			}
		})
		.doOnError(new Action1<Throwable>() {

			@Override
			public void call(Throwable t) {
				System.out.println("doOnError " + t);
				
			}
		})
		.subscribe(new Subscriber<Integer>() {
		    @Override
		    public void onCompleted() {
		        System.out.println("onCompleted");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Integer integer) {
		        System.out.println("onNext: " + integer);
		    }
		});

	}
	
	/**
	 * Materialize/Dematerialize
	 * 
	 *  Meterialize操作符将OnNext/OnError/OnCompleted都转化为一个Notification对象并按照原来的顺序发射出来，
	 *  而DeMeterialize则是执行相反的过程。
	 *  
	 *  注意：在调用dematerialize()之前必须先调用materialize()，否则会报错。
	 *  //http://www.myexception.cn/mobile/2088771.html
	 */
	private static void test4(){
		Observable.just(1,2,3,4,5,6).materialize()
		.subscribe(new Action1<Notification>() {

			@Override
			public void call(Notification t) {
				//这里，onComplete也被meterialize包装后发射了出来，onError也同样。
				System.out.println("meterialize:" + t.getValue() + "   type " + t.getKind());				
			}
		});
		System.out.println("-------------------------");	
		
		//将Notification逆转为普通消息发射
		Observable<Integer> observable = Observable.just(1,2,3,4,5,6).materialize().dematerialize();
		observable.subscribe(new Action1<Integer>() {

			@Override
			public void call(Integer value) {
				System.out.println("onNext: " + value);
			}
		});
		
	}
	
	/**
	 * ObserveOn/SubscribeOn
	 * 
	 * SubscribeOn：指定Observable自身在哪个调度器上执行（即在那个线程上运行），如果Observable需要执行
	 * 耗时操作，一般我们可以让其在新开的一个子线程上运行，好比AsyncTask的doInBackground方法。 
	 * 
	 * ObserveOn：可以使用observeOn操作符指定Observable在哪个调度器上发送通知给观察者（调用观察者的
	 * onNext,onCompleted,onError方法）。一般我们可以指定在主线程中观察，这样就可以修改UI，相当于
	 * AsyncTask的onPreExecute 、onPrograssUpdate和onPostExecute 方法中执行
	 * 
	 * 这两个操作符对于Android开发来说非常适用，因为Android中只能在主线程中修改UI，耗时操作不能在主线程中执行，
	 * 所以我们经常会创建新的Thread去执行耗时操作，然后配合Handler修改UI，或者使用AsyncTask。RxJava中使用
	 * 这两个操作符能够让我们非常方便的处理各种线程问题。
	 * 
	 * 
	 * @throws InterruptedException 
	 * 
	 */
	private static void test5() throws InterruptedException{
		Observable<Integer> observable = Observable.just(1,2);
		
		observable.subscribeOn(Schedulers.io())
		.map(new Func1<Integer, Integer>() {

			@Override
			public Integer call(Integer t) {
				System.out.println("map thread : " + Thread.currentThread().getName());
				return t * 10;
			}
		})
		.observeOn(Schedulers.newThread())//对应android是：AndroidSchedulers.mainThread()
		.subscribe(new Action1<Integer>() {
			
			@Override
			public void call(Integer value) {
				System.out.println("subscribe thread : " + Thread.currentThread().getName());
				System.out.println("onNext: " + value);
			}
		});
		
		Thread.sleep(2000); 
	}
	
	/**
	 * TimeInterval(间隔)
	 * TimeInterval操作符拦截原始Observable发射的数据项，替换为两个连续发射物之间流逝的时间长度。 
	 * 也就是说这个使用这个操作符后发射的不再是原始数据，而是原始数据发射的时间间隔。
	 * 
	 * 
	 * @throws InterruptedException 
	 */
	private static void test6() throws InterruptedException{
		Observable.create(new Observable.OnSubscribe<Integer>() {
			@Override
			public void call(Subscriber<? super Integer> subscriber) {
				for (int i = 3; i <= 7; i++) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					subscriber.onNext(i);
				}
				subscriber.onCompleted();
			}
		})
		.subscribeOn(Schedulers.io())
		.timeInterval()
		.subscribe(new Subscriber<TimeInterval<Integer>>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted");
			}

			@Override
			public void onError(Throwable e) {
			}

			@Override
			public void onNext(TimeInterval<Integer> t) {
				System.out.println("onNext: " + t.getValue() + " - " + t.getIntervalInMilliseconds());
			}
		});
		
		Thread.sleep(6000); 
	}
	
	
	/**
	 * Timeout
	 * 如果原始Observable过了指定的一段时长没有发射任何数据，Timeout操作符会以一个onError通知终止这个Observable，
	 * 或者继续一个备用的Observable。
	 * 
	 * 1.timeout(long,TimeUnit)： 第一个变体接受一个时长参数，每当原始Observable发射了一项数据，timeout就启动一个计时器，
	 * 如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据，timeout就抛出TimeoutException，以一个错误通知
	 * 终止Observable。 这个timeout默认在computation调度器上执行，你可以通过参数指定其它的调度器。
	 * 
	 * 2.timeout(long,TimeUnit,Observable)： 这个版本的timeout在超时时会切换到使用一个你指定的备用的Observable，而不
	 * 是发错误通知。它也默认在computation调度器上执行。
	 * 
	 * 3.timeout(Func1)：这个版本的timeout使用一个函数针对原始Observable的每一项返回一个Observable，如果当这个
	 * Observable终止时原始Observable还没有发射另一项数据，就会认为是超时了，timeout就抛出TimeoutException，以一个
	 * 错误通知终止Observable。
	 * 4.timeout(Func1,Observable)： 这个版本的timeout同时指定超时时长和备用的Observable。它默认在immediate调度器上执行
	 * 5.timeout(Func0,Func1)：这个版本的time除了给每一项设置超时，还可以单独给第一项设置一个超时。它默认在immediate调度器上执行。
	 * 6.timeout(Func0,Func1,Observable)： 同上，但是同时可以指定一个备用的Observable。它默认在immediate调度器上执行。
	 * 
	 * 
	 * @throws InterruptedException 
	 */
	private static void test7() throws InterruptedException{
		Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
			@Override
			public void call(Subscriber<? super Integer> subscriber) {
				for (int i = 0; i < 10; i++) {
					try {
						Thread.sleep(i*100);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					subscriber.onNext(i);
				}
				subscriber.onCompleted();
			}
		});
		
		observable
		.timeout(200, TimeUnit.MILLISECONDS, Observable.just(100,200))
        .subscribe(new Subscriber<Integer>() {
        	@Override
		    public void onCompleted() {
		        System.out.println("onCompleted");
		    }

		    @Override
		    public void onError(Throwable e) {
		        System.out.println("onError: " + e.getMessage());
		    }

		    @Override
		    public void onNext(Integer integer) {
		        System.out.println("onNext: " + integer);
		    }
        });
		
		Thread.sleep(10000); 
	}
	
	/**
	 * Timestamp
	 * 它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped的数据的Observable，每一项都包含数据的发射时间。
	 * 也就是把Observable发射的数据重新包装了一下，将数据发射的时间打包一起发射出去，这样观察者不仅能得到数据，还能得到数据的发射时间。 
	 * @throws InterruptedException
	 */
	private static void test8(){
		Observable.just(1,2,3)
        .timestamp()
        .subscribe(new Subscriber<Timestamped<Integer>>() {
        	@Override
			public void onCompleted() {
			}

			@Override
			public void onError(Throwable e) {
			}

			@Override
			public void onNext(Timestamped<Integer> t) {
				System.out.println("onNext: " + t.getValue() + " ,time:" + t.getTimestampMillis());
			}
        });
	}
	
	
	/**
	 * Using
	 * using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源，当Observable终止时这个资源会被自动释放。
	 * 
	 * Javadoc: using(Func0,Func1,Action1)
	 * 
	 * using操作符接受三个参数：
	 * 
	 * 一个用户创建一次性资源的工厂函数（资源类型不限）
	 * 一个用于创建Observable的工厂函数
	 * 一个用于释放资源的函数

	 * @throws InterruptedException
	 * 
	 * test9方法 使用using操作符，丢弃了Observable.from(studentList)发射的数据，使用了在原Observable生命周期内的一次性资源
	 */
	@SuppressWarnings("static-access")
	private static void test9() throws InterruptedException{
		
		Observable.from(studentList)
		.using(new Func0<Integer>() {

			@Override
			public Integer call() {
				 return new Random().nextInt(100);
			}
		}, new Func1<Integer, Observable<String>>() {

			@Override
			public Observable<String> call(Integer t) {
                return Observable.just("西部世界接待员-" + t);

			}
		}, new Action1<Integer>() {

			@Override
			public void call(Integer t) {
				t = null;
			}
		}).subscribe(new Action1<String>() {

			@Override
			public void call(String t) {
				System.out.println("call : " + t);
			}
		});
		
	} 
	

	public static void main(String[] args) throws InterruptedException {
		test9();

	}

}
